diff --git a/db/column_family.cc b/db/column_family.cc index 3d6b79e7a..540e36a70 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -13,6 +13,7 @@ #include #include +#include "db/db_impl.h" #include "db/version_set.h" #include "db/internal_stats.h" #include "db/compaction_picker.h" @@ -22,6 +23,27 @@ namespace rocksdb { +ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd, + DBImpl* db, port::Mutex* mutex) + : cfd_(cfd), db_(db), mutex_(mutex) { + if (cfd_ != nullptr) { + cfd_->Ref(); + } +} + +ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { + if (cfd_ != nullptr) { + DBImpl::DeletionState deletion_state; + mutex_->Lock(); + if (cfd_->Unref()) { + delete cfd_; + } + db_->FindObsoleteFiles(deletion_state, false, true); + mutex_->Unlock(); + db_->PurgeObsoleteFiles(deletion_state); + } +} + namespace { // Fix user-supplied options to be reasonable template @@ -134,11 +156,14 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, Version* dummy_versions, Cache* table_cache, const ColumnFamilyOptions& options, const DBOptions* db_options, - const EnvOptions& storage_options) + const EnvOptions& storage_options, + ColumnFamilySet* column_family_set) : id_(id), name_(name), dummy_versions_(dummy_versions), current_(nullptr), + refs_(0), + dropped_(false), internal_comparator_(options.comparator), internal_filter_policy_(options.filter_policy), options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_, @@ -151,7 +176,10 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, next_(nullptr), prev_(nullptr), log_number_(0), - need_slowdown_for_num_level0_files_(false) { + need_slowdown_for_num_level0_files_(false), + column_family_set_(column_family_set) { + Ref(); + // if dummy_versions is nullptr, then this is a dummy column family. if (dummy_versions != nullptr) { internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, @@ -172,7 +200,15 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, } } +// DB mutex held ColumnFamilyData::~ColumnFamilyData() { + assert(refs_ == 0); + // remove from linked list + auto prev = prev_; + auto next = next_; + prev->next_ = next; + next->prev_ = prev; + if (super_version_ != nullptr) { bool is_last_reference __attribute__((unused)); is_last_reference = super_version_->Unref(); @@ -180,6 +216,17 @@ ColumnFamilyData::~ColumnFamilyData() { super_version_->Cleanup(); delete super_version_; } + + // it's nullptr for dummy CFD + if (column_family_set_ != nullptr) { + // remove from column_family_set + column_family_set_->DropColumnFamily(this); + } + + if (current_ != nullptr) { + current_->Unref(); + } + if (dummy_versions_ != nullptr) { // List must be empty assert(dummy_versions_->next_ == dummy_versions_); @@ -248,24 +295,25 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, : max_column_family_(0), dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr, ColumnFamilyOptions(), db_options, - storage_options_)), + storage_options_, nullptr)), db_name_(dbname), db_options_(db_options), storage_options_(storage_options), table_cache_(table_cache), spin_lock_(ATOMIC_FLAG_INIT) { // initialize linked list - dummy_cfd_->prev_.store(dummy_cfd_); - dummy_cfd_->next_.store(dummy_cfd_); + dummy_cfd_->prev_ = dummy_cfd_; + dummy_cfd_->next_ = dummy_cfd_; } ColumnFamilySet::~ColumnFamilySet() { - for (auto& cfd : column_family_data_) { - delete cfd.second; - } - for (auto& cfd : droppped_column_families_) { + while (column_family_data_.size() > 0) { + // cfd destructor will delete itself from column_family_data_ + auto cfd = column_family_data_.begin()->second; + cfd->Unref(); delete cfd; } + dummy_cfd_->Unref(); delete dummy_cfd_; } @@ -303,39 +351,36 @@ uint32_t ColumnFamilySet::GetNextColumnFamilyID() { return ++max_column_family_; } +// under a DB mutex ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const std::string& name, uint32_t id, Version* dummy_versions, const ColumnFamilyOptions& options) { assert(column_families_.find(name) == column_families_.end()); - column_families_.insert({name, id}); ColumnFamilyData* new_cfd = new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_, - options, db_options_, storage_options_); + options, db_options_, storage_options_, this); + Lock(); + column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); + Unlock(); max_column_family_ = std::max(max_column_family_, id); // add to linked list - new_cfd->next_.store(dummy_cfd_); - auto prev = dummy_cfd_->prev_.load(); - new_cfd->prev_.store(prev); - prev->next_.store(new_cfd); - dummy_cfd_->prev_.store(new_cfd); + new_cfd->next_ = dummy_cfd_; + auto prev = dummy_cfd_->prev_; + new_cfd->prev_ = prev; + prev->next_ = new_cfd; + dummy_cfd_->prev_ = new_cfd; return new_cfd; } -void ColumnFamilySet::DropColumnFamily(uint32_t id) { - assert(id != 0); - auto cfd_iter = column_family_data_.find(id); +// under a DB mutex +void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) { + auto cfd_iter = column_family_data_.find(cfd->GetID()); assert(cfd_iter != column_family_data_.end()); - auto cfd = cfd_iter->second; - column_families_.erase(cfd->GetName()); - cfd->current()->Unref(); - droppped_column_families_.push_back(cfd); + Lock(); column_family_data_.erase(cfd_iter); - // remove from linked list - auto prev = cfd->prev_.load(); - auto next = cfd->next_.load(); - prev->next_.store(next); - next->prev_.store(prev); + column_families_.erase(cfd->GetName()); + Unlock(); } void ColumnFamilySet::Lock() { @@ -347,8 +392,11 @@ void ColumnFamilySet::Lock() { void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { + // maybe outside of db mutex, should lock + column_family_set_->Lock(); current_ = column_family_set_->GetColumnFamily(column_family_id); - handle_.id = column_family_id; + column_family_set_->Unlock(); + handle_.SetCFD(current_); return current_ != nullptr; } @@ -367,10 +415,9 @@ const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const { return current_->full_options(); } -const ColumnFamilyHandle& ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() - const { +ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { assert(current_ != nullptr); - return handle_; + return &handle_; } } // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h index 999433add..51736cc7f 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -30,6 +30,35 @@ class CompactionPicker; class Compaction; class InternalKey; class InternalStats; +class ColumnFamilyData; +class DBImpl; + +class ColumnFamilyHandleImpl : public ColumnFamilyHandle { + public: + // create while holding the mutex + ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db, port::Mutex* mutex); + // destroy without mutex + virtual ~ColumnFamilyHandleImpl(); + virtual ColumnFamilyData* cfd() const { return cfd_; } + + private: + ColumnFamilyData* cfd_; + DBImpl* db_; + port::Mutex* mutex_; +}; + +// does not ref-count cfd_ +class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl { + public: + ColumnFamilyHandleInternal() + : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr) {} + + void SetCFD(ColumnFamilyData* cfd) { internal_cfd_ = cfd; } + virtual ColumnFamilyData* cfd() const override { return internal_cfd_; } + + private: + ColumnFamilyData* internal_cfd_; +}; // holds references to memtable, all immutable memtables and version struct SuperVersion { @@ -63,12 +92,35 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, const InternalFilterPolicy* ipolicy, const ColumnFamilyOptions& src); +class ColumnFamilySet; + // column family metadata. not thread-safe. should be protected by db_mutex class ColumnFamilyData { public: + ~ColumnFamilyData(); + uint32_t GetID() const { return id_; } const std::string& GetName() { return name_; } + // DB mutex held for all these + void Ref() { ++refs_; } + // will just decrease reference count to 0, but will not delete it. returns + // true if the ref count was decreased to zero and needs to be cleaned up by + // the caller + bool Unref() { + assert(refs_ > 0); + return --refs_ == 0; + } + bool Dead() { return refs_ == 0; } + + // SetDropped() and IsDropped() are thread-safe + void SetDropped() { + // can't drop default CF + assert(id_ != 0); + dropped_.store(true); + } + bool IsDropped() const { return dropped_.load(); } + int NumberLevels() const { return options_.num_levels; } void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } @@ -126,16 +178,19 @@ class ColumnFamilyData { const std::string& name, Version* dummy_versions, Cache* table_cache, const ColumnFamilyOptions& options, const DBOptions* db_options, - const EnvOptions& storage_options); - ~ColumnFamilyData(); + const EnvOptions& storage_options, + ColumnFamilySet* column_family_set); - ColumnFamilyData* next() { return next_.load(); } + ColumnFamilyData* next() { return next_; } uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. Version* current_; // == dummy_versions->prev_ + int refs_; // outstanding references to ColumnFamilyData + std::atomic dropped_; // true if client dropped it + const InternalKeyComparator internal_comparator_; const InternalFilterPolicy internal_filter_policy_; @@ -157,8 +212,8 @@ class ColumnFamilyData { // pointers for a circular linked list. we use it to support iterations // that can be concurrent with writes - std::atomic next_; - std::atomic prev_; + ColumnFamilyData* next_; + ColumnFamilyData* prev_; // This is the earliest log file number that contains data from this // Column Family. All earlier log files must be ignored and not @@ -172,6 +227,8 @@ class ColumnFamilyData { // An object that keeps all the compaction stats // and picks the next compaction std::unique_ptr compaction_picker_; + + ColumnFamilySet* column_family_set_; }; // Thread safe only for reading without a writer. All access should be @@ -183,7 +240,10 @@ class ColumnFamilySet { explicit iterator(ColumnFamilyData* cfd) : current_(cfd) {} iterator& operator++() { - current_ = current_->next(); + // dummy is never dead, so this will never be infinite + do { + current_ = current_->next(); + } while (current_->Dead()); return *this; } bool operator!=(const iterator& other) { @@ -216,26 +276,31 @@ class ColumnFamilySet { ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id, Version* dummy_version, const ColumnFamilyOptions& options); - void DropColumnFamily(uint32_t id); + void DropColumnFamily(ColumnFamilyData* cfd); iterator begin() { return iterator(dummy_cfd_->next()); } iterator end() { return iterator(dummy_cfd_); } // ColumnFamilySet has interesting thread-safety requirements - // * CreateColumnFamily() or DropColumnFamily() -- need to Lock() and also - // execute inside of DB mutex - // * Iterate -- without any locks - // * GetDefault(), GetColumnFamily(), Exists(), GetID(), - // GetNextColumnFamilyID() -- either inside of DB mutex or call Lock() + // * CreateColumnFamily() or DropColumnFamily() -- need to protect by DB + // mutex. Inside, column_family_data_ and column_families_ will be protected + // by Lock() and Unlock() + // * Iterate -- hold DB mutex, but you can release it in the body of + // iteration. If you release DB mutex in body, reference the column + // family before the mutex and unreference after you unlock, since the column + // family might get dropped when you release the DB mutex. + // * GetDefault(), GetColumnFamily(), Exists(), GetID() -- either inside of DB + // mutex or call Lock() + // * GetNextColumnFamilyID() -- inside of DB mutex void Lock(); void Unlock(); private: + // when mutating: 1. DB mutex locked first, 2. spinlock locked second + // when reading, either: 1. lock DB mutex, or 2. lock spinlock + // (if both, respect the ordering to avoid deadlock!) std::unordered_map column_families_; std::unordered_map column_family_data_; - // we need to keep them alive because we still can't control the lifetime of - // all of column family data members (options for example) - std::vector droppped_column_families_; uint32_t max_column_family_; ColumnFamilyData* dummy_cfd_; @@ -266,12 +331,12 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { virtual const Options* GetFullOptions() const override; // Returns column family handle for the selected column family - virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const override; + virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; private: ColumnFamilySet* column_family_set_; ColumnFamilyData* current_; - ColumnFamilyHandle handle_; + ColumnFamilyHandleInternal handle_; }; } // namespace rocksdb diff --git a/db/column_family_test.cc b/db/column_family_test.cc index dbf8148d4..c4fcc03f6 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -31,6 +31,10 @@ class ColumnFamilyTest { } void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); delete db_; db_ = nullptr; } @@ -45,6 +49,10 @@ class ColumnFamilyTest { } void Destroy() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); delete db_; db_ = nullptr; ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_))); @@ -59,6 +67,14 @@ class ColumnFamilyTest { } } + void DropColumnFamilies(const vector& cfs) { + for (auto cf : cfs) { + ASSERT_OK(db_->DropColumnFamily(handles_[cf])); + delete handles_[cf]; + handles_[cf] = nullptr; + } + } + Status Put(int cf, const string& key, const string& value) { return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); } @@ -111,6 +127,12 @@ class ColumnFamilyTest { return result; } + int CountLiveFiles(int cf) { + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + return static_cast(metadata.size()); + } + // Do n memtable flushes, each of which produces an sstable // covering the range [small,large]. void MakeTables(int cf, int n, const string& small, @@ -146,7 +168,7 @@ class ColumnFamilyTest { ASSERT_OK(destfile->Close()); } - vector handles_; + vector handles_; ColumnFamilyOptions column_family_options_; DBOptions db_options_; string dbname_; @@ -156,16 +178,9 @@ class ColumnFamilyTest { TEST(ColumnFamilyTest, AddDrop) { ASSERT_OK(Open({"default"})); - ColumnFamilyHandle handles[4]; - ASSERT_OK( - db_->CreateColumnFamily(column_family_options_, "one", &handles[0])); - ASSERT_OK( - db_->CreateColumnFamily(column_family_options_, "two", &handles[1])); - ASSERT_OK( - db_->CreateColumnFamily(column_family_options_, "three", &handles[2])); - ASSERT_OK(db_->DropColumnFamily(handles[1])); - ASSERT_OK( - db_->CreateColumnFamily(column_family_options_, "four", &handles[3])); + CreateColumnFamilies({"one", "two", "three"}); + DropColumnFamilies({2}); + CreateColumnFamilies({"four"}); Close(); ASSERT_TRUE(Open({"default"}).IsInvalidArgument()); ASSERT_OK(Open({"default", "one", "three", "four"})); @@ -177,6 +192,33 @@ TEST(ColumnFamilyTest, AddDrop) { ASSERT_TRUE(families == vector({"default", "four", "one", "three"})); } +TEST(ColumnFamilyTest, DropTest) { + // first iteration - dont reopen DB before dropping + // second iteration - reopen DB before dropping + for (int iter = 0; iter < 2; ++iter) { + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"pikachu"}); + Close(); + ASSERT_OK(Open({"default", "pikachu"})); + for (int i = 0; i < 100; ++i) { + ASSERT_OK(Put(1, std::to_string(i), "bar" + std::to_string(i))); + } + ASSERT_OK(Flush(1)); + + if (iter == 1) { + Close(); + ASSERT_OK(Open({"default", "pikachu"})); + } + ASSERT_EQ("bar1", Get(1, "1")); + + ASSERT_EQ(CountLiveFiles(1), 1); + DropColumnFamilies({1}); + // make sure that all files are deleted when we drop the column family + ASSERT_EQ(CountLiveFiles(1), 0); + Destroy(); + } +} + TEST(ColumnFamilyTest, ReadWrite) { ASSERT_OK(Open({"default"})); CreateColumnFamilies({"one", "two"}); diff --git a/db/compaction.cc b/db/compaction.cc index 279481d68..44e0dea84 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -43,6 +43,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, is_full_compaction_(false), level_ptrs_(std::vector(number_levels_)) { + cfd_->Ref(); input_version_->Ref(); edit_ = new VersionEdit(); edit_->SetColumnFamily(cfd_->GetID()); @@ -56,6 +57,11 @@ Compaction::~Compaction() { if (input_version_ != nullptr) { input_version_->Unref(); } + if (cfd_ != nullptr) { + if (cfd_->Unref()) { + delete cfd_; + } + } } bool Compaction::IsTrivialMove() const { @@ -168,6 +174,12 @@ void Compaction::ReleaseInputs() { input_version_->Unref(); input_version_ = nullptr; } + if (cfd_ != nullptr) { + if (cfd_->Unref()) { + delete cfd_; + } + cfd_ = nullptr; + } } void Compaction::ReleaseCompactionFiles(Status status) { diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index c0284f153..3a8dcd564 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // Make a set of all of the live *.sst files std::set live; - default_cfd_->current()->AddLiveFiles(&live); + default_cf_handle_->cfd()->current()->AddLiveFiles(&live); ret.clear(); ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST diff --git a/db/db_impl.cc b/db/db_impl.cc index afc425b59..1ce682424 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -42,7 +42,6 @@ #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" -#include "rocksdb/column_family.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" @@ -218,6 +217,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) shutting_down_(nullptr), bg_cv_(&mutex_), logfile_number_(0), + default_cf_handle_(nullptr), tmp_batch_(), bg_compaction_scheduled_(0), bg_manual_only_(0), @@ -270,14 +270,24 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) DBImpl::~DBImpl() { // Wait for background work to finish + mutex_.Lock(); if (flush_on_destroy_) { + autovector to_delete; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->mem()->GetFirstSequenceNumber() != 0) { + cfd->Ref(); + mutex_.Unlock(); FlushMemTable(cfd, FlushOptions()); + mutex_.Lock(); + if (cfd->Unref()) { + to_delete.push_back(cfd); + } } } + for (auto cfd : to_delete) { + delete cfd; + } } - mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-nullptr value is ok while (bg_compaction_scheduled_ || bg_flush_scheduled_ || @@ -285,6 +295,10 @@ DBImpl::~DBImpl() { bg_cv_.Wait(); } mutex_.Unlock(); + if (default_cf_handle_ != nullptr) { + // we need to delete handle outside of lock because it does its own locking + delete default_cf_handle_; + } if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); @@ -816,7 +830,8 @@ Status DBImpl::Recover( Status s = versions_->Recover(column_families); if (s.ok()) { SequenceNumber max_sequence(0); - default_cfd_ = versions_->GetColumnFamilySet()->GetDefault(); + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); // Recover from all newer log files than the ones named in the // descriptor (new log files may have been added by the previous @@ -891,6 +906,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, mutex_.AssertHeld(); std::unordered_map version_edits; + // no need to refcount because iteration is under mutex for (auto cfd : *versions_->GetColumnFamilySet()) { VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); @@ -934,7 +950,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); - // No need to lock ColumnFamilySet here since its under a DB mutex status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), log_number); @@ -950,6 +965,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } if (!read_only) { + // no need to refcount since client still doesn't have access + // to the DB and can not drop column families while we iterate for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->mem()->ApproximateMemoryUsage() > cfd->options()->write_buffer_size) { @@ -973,6 +990,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } if (!read_only) { + // no need to refcount since client still doesn't have access + // to the DB and can not drop column families while we iterate for (auto cfd : *versions_->GetColumnFamilySet()) { auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); @@ -1198,10 +1217,8 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // This will release and re-acquire the mutex. Status s = WriteLevel0Table(cfd, mems, edit, &file_number); - if (s.ok() && shutting_down_.Acquire_Load()) { - s = Status::IOError( - "Database shutdown started during memtable compaction" - ); + if (s.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { + s = Status::IOError("Column family closed during memtable flush"); } // Replace immutable memtable with the generated Table @@ -1229,15 +1246,11 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, return s; } -Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family, +Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, bool reduce_level, int target_level) { - mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - mutex_.Unlock(); - // this is asserting because client calling DB methods with undefined - // ColumnFamilyHandle is undefined behavior. - assert(cfd != nullptr); + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); Status s = FlushMemTable(cfd, FlushOptions()); if (!s.ok()) { @@ -1367,38 +1380,25 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { return status; } -int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) { - mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - mutex_.Unlock(); - assert(cfd != nullptr); - return cfd->NumberLevels(); +int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + return cfh->cfd()->NumberLevels(); } -int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) { - mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - mutex_.Unlock(); - assert(cfd != nullptr); - return cfd->options()->max_mem_compaction_level; +int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + return cfh->cfd()->options()->max_mem_compaction_level; } -int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { - mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - mutex_.Unlock(); - assert(cfd != nullptr); - return cfd->options()->level0_stop_writes_trigger; +int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + return cfh->cfd()->options()->level0_stop_writes_trigger; } Status DBImpl::Flush(const FlushOptions& options, - const ColumnFamilyHandle& column_family) { - mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - mutex_.Unlock(); - assert(cfd != nullptr); - - return FlushMemTable(cfd, options); + ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + return FlushMemTable(cfh->cfd(), options); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { @@ -1666,11 +1666,12 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status DBImpl::TEST_CompactRange(int level, const Slice* begin, const Slice* end) { + auto default_cfd = default_cf_handle_->cfd(); int output_level = - (default_cfd_->options()->compaction_style == kCompactionStyleUniversal) + (default_cfd->options()->compaction_style == kCompactionStyleUniversal) ? level : level + 1; - return RunManualCompaction(default_cfd_, level, output_level, begin, end); + return RunManualCompaction(default_cfd, level, output_level, begin, end); } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, @@ -1698,11 +1699,11 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { } Status DBImpl::TEST_FlushMemTable() { - return FlushMemTable(default_cfd_, FlushOptions()); + return FlushMemTable(default_cf_handle_->cfd(), FlushOptions()); } Status DBImpl::TEST_WaitForFlushMemTable() { - return WaitForFlushMemTable(default_cfd_); + return WaitForFlushMemTable(default_cf_handle_->cfd()); } Status DBImpl::TEST_WaitForCompact() { @@ -1728,6 +1729,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // DB is being deleted; no more background compactions } else { bool is_flush_pending = false; + // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->imm()->IsFlushPending()) { is_flush_pending = true; @@ -1744,6 +1746,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } bool is_compaction_needed = false; + // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->current()->NeedsCompaction()) { is_compaction_needed = true; @@ -1774,17 +1777,38 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { - Status stat; + mutex_.AssertHeld(); + // call_status is failure if at least one flush was a failure. even if + // flushing one column family reports a failure, we will continue flushing + // other column families. however, call_status will be a failure in that case. + Status call_status; + autovector to_delete; + // refcounting in iteration for (auto cfd : *versions_->GetColumnFamilySet()) { - while (stat.ok() && cfd->imm()->IsFlushPending()) { + if (cfd->IsDropped()) { + continue; + } + cfd->Ref(); + Status flush_status; + while (flush_status.ok() && cfd->imm()->IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile with column " "family %u, flush slots available %d", cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_); - stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state); + flush_status = + FlushMemTableToOutputFile(cfd, madeProgress, deletion_state); + } + if (call_status.ok() && !flush_status.ok()) { + call_status = flush_status; + } + if (cfd->Unref()) { + to_delete.push_back(cfd); } } - return stat; + for (auto cfd : to_delete) { + delete cfd; + } + return call_status; } void DBImpl::BackgroundCallFlush() { @@ -1835,7 +1859,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { uint64_t DBImpl::TEST_GetLevel0TotalSize() { MutexLock l(&mutex_); - return default_cfd_->current()->NumLevelBytes(0); + return default_cf_handle_->cfd()->current()->NumLevelBytes(0); } void DBImpl::BackgroundCallCompaction() { @@ -1921,8 +1945,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, ? "(end)" : manual_end->DebugString().c_str())); } else { + // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->options()->disable_auto_compactions) { + if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) { c.reset(cfd->PickCompaction()); if (c != nullptr) { // update statistics @@ -2302,7 +2327,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compaction_filter = compaction_filter_from_factory.get(); } - for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { + while (input->Valid() && !shutting_down_.Acquire_Load() && + !cfd->IsDropped()) { Slice key = input->key(); Slice value = input->value(); @@ -2546,8 +2572,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } } - if (status.ok() && shutting_down_.Acquire_Load()) { - status = Status::IOError("Database shutdown started during compaction"); + if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { + status = Status::IOError("Column family closing started during compaction"); } if (status.ok() && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input.get()); @@ -2638,8 +2664,7 @@ struct IterState { static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); - DBImpl::DeletionState deletion_state(state->db->GetOptions(). - max_write_buffer_number); + DBImpl::DeletionState deletion_state; bool need_cleanup = state->super_version->Unref(); if (need_cleanup) { @@ -2677,11 +2702,17 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, return internal_iter; } +ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { + return default_cf_handle_; +} + Iterator* DBImpl::TEST_NewInternalIterator() { mutex_.Lock(); - SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref(); + SuperVersion* super_version = + default_cf_handle_->cfd()->GetSuperVersion()->Ref(); mutex_.Unlock(); - return NewInternalIterator(ReadOptions(), default_cfd_, super_version); + return NewInternalIterator(ReadOptions(), default_cf_handle_->cfd(), + super_version); } std::pair DBImpl::GetTailingIteratorPair( @@ -2725,11 +2756,11 @@ std::pair DBImpl::GetTailingIteratorPair( int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { MutexLock l(&mutex_); - return default_cfd_->current()->MaxNextLevelOverlappingBytes(); + return default_cf_handle_->cfd()->current()->MaxNextLevelOverlappingBytes(); } Status DBImpl::Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { return GetImpl(options, column_family, key, value); } @@ -2762,18 +2793,16 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, } Status DBImpl::GetImpl(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, - bool* value_found) { + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found) { StopWatch sw(env_, options_.statistics.get(), DB_GET, false); StopWatchNano snapshot_timer(env_, false); StartPerfTimer(&snapshot_timer); + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - // this is asserting because client calling DB methods with undefined - // ColumnFamilyHandle is undefined behavior. - assert(cfd != nullptr); SuperVersion* get_version = cfd->GetSuperVersion()->Ref(); mutex_.Unlock(); @@ -2851,7 +2880,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, std::vector DBImpl::MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values) { StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false); @@ -2869,8 +2898,12 @@ std::vector DBImpl::MultiGet( std::unordered_map multiget_cf_data; // fill up and allocate outside of mutex for (auto cf : column_family) { - if (multiget_cf_data.find(cf.id) == multiget_cf_data.end()) { - multiget_cf_data.insert({cf.id, new MultiGetColumnFamilyData()}); + auto cfh = reinterpret_cast(cf); + auto cfd = cfh->cfd(); + if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { + auto mgcfd = new MultiGetColumnFamilyData(); + mgcfd->cfd = cfd; + multiget_cf_data.insert({cfd->GetID(), mgcfd}); } } @@ -2881,10 +2914,8 @@ std::vector DBImpl::MultiGet( snapshot = versions_->LastSequence(); } for (auto mgd_iter : multiget_cf_data) { - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(mgd_iter.first); - assert(cfd != nullptr); - mgd_iter.second->cfd = cfd; - mgd_iter.second->super_version = cfd->GetSuperVersion()->Ref(); + mgd_iter.second->super_version = + mgd_iter.second->cfd->GetSuperVersion()->Ref(); } mutex_.Unlock(); @@ -2910,7 +2941,8 @@ std::vector DBImpl::MultiGet( std::string* value = &(*values)[i]; LookupKey lkey(keys[i], snapshot); - auto mgd_iter = multiget_cf_data.find(column_family[i].id); + auto cfh = reinterpret_cast(column_family[i]); + auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; auto super_version = mgd->super_version; @@ -2974,64 +3006,61 @@ std::vector DBImpl::MultiGet( Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, - ColumnFamilyHandle* handle) { - // whenever we are writing to column family set, we have to lock - versions_->GetColumnFamilySet()->Lock(); + ColumnFamilyHandle** handle) { + mutex_.Lock(); if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { return Status::InvalidArgument("Column family already exists"); } VersionEdit edit; edit.AddColumnFamily(column_family_name); - handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); - edit.SetColumnFamily(handle->id); + uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); + edit.SetColumnFamily(new_id); - mutex_.Lock(); - Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); + Status s = versions_->LogAndApply(default_cf_handle_->cfd(), &edit, &mutex_); if (s.ok()) { // add to internal data structures - versions_->CreateColumnFamily(options, &edit); + auto cfd = versions_->CreateColumnFamily(options, &edit); + *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); } mutex_.Unlock(); - versions_->GetColumnFamilySet()->Unlock(); Log(options_.info_log, "Created column family \"%s\"", column_family_name.c_str()); return s; } -Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { - if (column_family.id == 0) { +Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + if (cfd->GetID() == 0) { return Status::InvalidArgument("Can't drop default column family"); } - // whenever we are writing to column family set, we have to lock - versions_->GetColumnFamilySet()->Lock(); - if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) { - return Status::NotFound("Column family not found"); - } + Log(options_.info_log, "Dropping column family with id %u\n", cfd->GetID()); + VersionEdit edit; edit.DropColumnFamily(); - edit.SetColumnFamily(column_family.id); - mutex_.Lock(); - Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); - if (s.ok()) { - // remove from internal data structures - versions_->DropColumnFamily(&edit); - } - versions_->GetColumnFamilySet()->Unlock(); - DeletionState deletion_state; - FindObsoleteFiles(deletion_state, false, true); - mutex_.Unlock(); + edit.SetColumnFamily(cfd->GetID()); + + MutexLock l(&mutex_); + if (cfd->IsDropped()) { + return Status::InvalidArgument("Column family already dropped!\n"); + } + Status s = versions_->LogAndApply(cfd, &edit, &mutex_); + if (s.ok()) { + cfd->SetDropped(); + // DB is holding one reference to each column family when it's alive, + // need to drop it now + if (cfd->Unref()) { + delete cfd; + } + } - PurgeObsoleteFiles(deletion_state); - Log(options_.info_log, "Dropped column family with id %u\n", - column_family.id); return s; } bool DBImpl::KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, - bool* value_found) { + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found) { if (value_found != nullptr) { // falsify later if key-may-exist but can't fetch value *value_found = true; @@ -3047,12 +3076,12 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, } Iterator* DBImpl::NewIterator(const ReadOptions& options, - const ColumnFamilyHandle& column_family) { + ColumnFamilyHandle* column_family) { SequenceNumber latest_snapshot = 0; SuperVersion* super_version = nullptr; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - assert(cfd != nullptr); if (!options.tailing) { super_version = cfd->GetSuperVersion()->Ref(); latest_snapshot = versions_->LastSequence(); @@ -3083,7 +3112,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, Status DBImpl::NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, std::vector* iterators) { // TODO return Status::NotSupported("Not yet!"); @@ -3100,20 +3129,15 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { } // Convenience methods -Status DBImpl::Put(const WriteOptions& o, - const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& val) { +Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& val) { return DB::Put(o, column_family, key, val); } -Status DBImpl::Merge(const WriteOptions& o, - const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& val) { - mutex_.Lock(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - mutex_.Unlock(); - assert(cfd != nullptr); - if (!cfd->options()->merge_operator) { +Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& val) { + auto cfh = reinterpret_cast(column_family); + if (!cfh->cfd()->options()->merge_operator) { return Status::NotSupported("Provide a merge_operator when opening DB"); } else { return DB::Merge(o, column_family, key, val); @@ -3121,8 +3145,7 @@ Status DBImpl::Merge(const WriteOptions& o, } Status DBImpl::Delete(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key) { + ColumnFamilyHandle* column_family, const Slice& key) { return DB::Delete(options, column_family, key); } @@ -3155,13 +3178,22 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } Status status; + autovector to_delete; + // refcounting cfd in iteration for (auto cfd : *versions_->GetColumnFamilySet()) { + cfd->Ref(); // May temporarily unlock and wait. status = MakeRoomForWrite(cfd, my_batch == nullptr); + if (cfd->Unref()) { + to_delete.push_back(cfd); + } if (!status.ok()) { break; } } + for (auto cfd : to_delete) { + delete cfd; + } uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions @@ -3221,13 +3253,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (status.ok()) { StopWatchNano write_memtable_timer(env_, false); - // reading the column family set outside of DB mutex -- should lock - versions_->GetColumnFamilySet()->Lock(); StartPerfTimer(&write_memtable_timer); status = WriteBatchInternal::InsertInto( updates, column_family_memtables_.get(), 0, this, false); BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer); - versions_->GetColumnFamilySet()->Unlock(); if (!status.ok()) { // Panic for in-memory corruptions @@ -3536,29 +3565,28 @@ Env* DBImpl::GetEnv() const { return env_; } -const Options& DBImpl::GetOptions(const ColumnFamilyHandle& column_family) - const { - return *default_cfd_->full_options(); +const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { + auto cfh = reinterpret_cast(column_family); + return *cfh->cfd()->full_options(); } -bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, +bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) { value->clear(); + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); MutexLock l(&mutex_); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - assert(cfd != nullptr); return cfd->internal_stats()->GetProperty(property, value, cfd); } -void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, +void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* range, int n, uint64_t* sizes) { // TODO(opt): better implementation Version* v; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); { MutexLock l(&mutex_); - auto cfd = - versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - assert(cfd != nullptr); v = cfd->current(); v->Ref(); } @@ -3654,9 +3682,9 @@ Status DBImpl::DeleteFile(std::string name) { return status; } -void DBImpl::GetLiveFilesMetaData(std::vector *metadata) { +void DBImpl::GetLiveFilesMetaData(std::vector* metadata) { MutexLock l(&mutex_); - return versions_->GetLiveFilesMetaData(metadata); + versions_->GetLiveFilesMetaData(metadata); } Status DBImpl::GetDbIdentity(std::string& identity) { @@ -3688,38 +3716,40 @@ Status DBImpl::GetDbIdentity(std::string& identity) { // Default implementations of convenience methods that subclasses of DB // can call if they wish -Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family, +Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { // Pre-allocate size of write batch conservatively. // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // and we allocate 11 extra bytes for key length, as well as value length. WriteBatch batch(key.size() + value.size() + 24); - batch.Put(column_family.id, key, value); + auto cfh = reinterpret_cast(column_family); + batch.Put(cfh->cfd()->GetID(), key, value); return Write(opt, &batch); } -Status DB::Delete(const WriteOptions& opt, - const ColumnFamilyHandle& column_family, const Slice& key) { +Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key) { WriteBatch batch; - batch.Delete(column_family.id, key); + auto cfh = reinterpret_cast(column_family); + batch.Delete(cfh->cfd()->GetID(), key); return Write(opt, &batch); } -Status DB::Merge(const WriteOptions& opt, - const ColumnFamilyHandle& column_family, const Slice& key, - const Slice& value) { +Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { WriteBatch batch; - batch.Merge(column_family.id, key, value); + auto cfh = reinterpret_cast(column_family); + batch.Merge(cfh->cfd()->GetID(), key, value); return Write(opt, &batch); } // Default implementation -- returns not supported status Status DB::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, - ColumnFamilyHandle* handle) { + ColumnFamilyHandle** handle) { return Status::NotSupported(""); } -Status DB::DropColumnFamily(const ColumnFamilyHandle& column_family) { +Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) { return Status::NotSupported(""); } @@ -3731,14 +3761,22 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(default_column_family_name, cf_options)); - std::vector handles; - return DB::Open(db_options, dbname, column_families, &handles, dbptr); + std::vector handles; + Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + // i can delete the handle since DBImpl is always holding a reference to + // default column family + delete handles[0]; + } + return s; } Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, - std::vector* handles, DB** dbptr) { + std::vector* handles, DB** dbptr) { *dbptr = nullptr; + handles->clear(); EnvOptions soptions; size_t max_write_buffer_size = 0; @@ -3784,20 +3822,22 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, // that we used by calling impl->versions_->NewFileNumber() // The used log number are already written to manifest in RecoverLogFile() // method - s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_, + s = impl->versions_->LogAndApply(impl->default_cf_handle_->cfd(), &edit, + &impl->mutex_, impl->db_directory_.get()); } if (s.ok()) { // set column family handles - handles->clear(); for (auto cf : column_families) { if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) { s = Status::InvalidArgument("Column family not found: ", cf.name); - handles->clear(); break; } uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name); - handles->push_back(ColumnFamilyHandle(id)); + auto cfd = impl->versions_->GetColumnFamilySet()->GetColumnFamily(id); + assert(cfd != nullptr); + handles->push_back( + new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); } } if (s.ok()) { @@ -3836,6 +3876,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { *dbptr = impl; } else { + for (auto h : *handles) { + delete h; + } handles->clear(); delete impl; } diff --git a/db/db_impl.h b/db/db_impl.h index bb32ea046..3c971391c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -45,32 +45,31 @@ class DBImpl : public DB { // Implementations of the DB interface using DB::Put; virtual Status Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& value); using DB::Merge; virtual Status Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value); + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value); using DB::Delete; virtual Status Delete(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key); + ColumnFamilyHandle* column_family, const Slice& key); using DB::Write; virtual Status Write(const WriteOptions& options, WriteBatch* updates); using DB::Get; virtual Status Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value); using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values); virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family, - ColumnFamilyHandle* handle); - virtual Status DropColumnFamily(const ColumnFamilyHandle& column_family); + ColumnFamilyHandle** handle); + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family); // Returns false if key doesn't exist in the database and true if it may. // If value_found is not passed in as null, then return the value if found in @@ -78,43 +77,41 @@ class DBImpl : public DB { // , otherwise false. using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, - bool* value_found = nullptr); + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found = nullptr); using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options, - const ColumnFamilyHandle& column_family); + ColumnFamilyHandle* column_family); virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, std::vector* iterators); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); using DB::GetProperty; - virtual bool GetProperty(const ColumnFamilyHandle& column_family, + virtual bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value); using DB::GetApproximateSizes; - virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + virtual void GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* range, int n, uint64_t* sizes); using DB::CompactRange; - virtual Status CompactRange(const ColumnFamilyHandle& column_family, + virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1); using DB::NumberLevels; - virtual int NumberLevels(const ColumnFamilyHandle& column_family); + virtual int NumberLevels(ColumnFamilyHandle* column_family); using DB::MaxMemCompactionLevel; - virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family); + virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family); using DB::Level0StopWriteTrigger; - virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family); + virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family); virtual const std::string& GetName() const; virtual Env* GetEnv() const; using DB::GetOptions; - virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) - const; + virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const; using DB::Flush; virtual Status Flush(const FlushOptions& options, - const ColumnFamilyHandle& column_family); + ColumnFamilyHandle* column_family); virtual Status DisableFileDeletions(); virtual Status EnableFileDeletions(bool force); // All the returned filenames start with "/" @@ -246,6 +243,8 @@ class DBImpl : public DB { // It is not necessary to hold the mutex when invoking this method. void PurgeObsoleteFiles(DeletionState& deletion_state); + ColumnFamilyHandle* DefaultColumnFamily() const; + protected: Env* const env_; const std::string dbname_; @@ -381,7 +380,7 @@ class DBImpl : public DB { port::CondVar bg_cv_; // Signalled when background work finishes uint64_t logfile_number_; unique_ptr log_; - ColumnFamilyData* default_cfd_; + ColumnFamilyHandleImpl* default_cf_handle_; unique_ptr column_family_memtables_; std::string host_name_; @@ -493,9 +492,9 @@ class DBImpl : public DB { // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here - Status GetImpl(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, - std::string* value, bool* value_found = nullptr); + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, std::string* value, + bool* value_found = nullptr); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index d7c50739e..b179ff5f8 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -29,7 +29,6 @@ #include "db/write_batch_internal.h" #include "rocksdb/db.h" #include "rocksdb/env.h" -#include "rocksdb/column_family.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "rocksdb/merge_operator.h" @@ -54,11 +53,12 @@ DBImplReadOnly::~DBImplReadOnly() { // Implementations of the DB interface Status DBImplReadOnly::Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value) { + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) { Status s; SequenceNumber snapshot = versions_->LastSequence(); - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); SuperVersion* super_version = cfd->GetSuperVersion(); MergeContext merge_context; LookupKey lkey(key, snapshot); @@ -73,9 +73,9 @@ Status DBImplReadOnly::Get(const ReadOptions& options, } Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options, - const ColumnFamilyHandle& column_family) { - auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); - assert(cfd != nullptr); + ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); SequenceNumber latest_snapshot = versions_->LastSequence(); Iterator* internal_iter = NewInternalIterator(options, cfd, super_version); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index d48cd5d65..c4703ba69 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -32,18 +32,18 @@ class DBImplReadOnly : public DBImpl { // Implementations of the DB interface using DB::Get; virtual Status Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value); // TODO: Implement ReadOnly MultiGet? using DBImpl::NewIterator; virtual Iterator* NewIterator(const ReadOptions&, - const ColumnFamilyHandle& column_family); + ColumnFamilyHandle* column_family); virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, std::vector* iterators) { // TODO return Status::NotSupported("Not supported yet."); @@ -51,27 +51,26 @@ class DBImplReadOnly : public DBImpl { using DBImpl::Put; virtual Status Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { return Status::NotSupported("Not supported operation in read only mode."); } using DBImpl::Merge; virtual Status Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { return Status::NotSupported("Not supported operation in read only mode."); } using DBImpl::Delete; virtual Status Delete(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key) { + ColumnFamilyHandle* column_family, const Slice& key) { return Status::NotSupported("Not supported operation in read only mode."); } virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Status::NotSupported("Not supported operation in read only mode."); } using DBImpl::CompactRange; - virtual Status CompactRange(const ColumnFamilyHandle& column_family, + virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) { @@ -90,7 +89,7 @@ class DBImplReadOnly : public DBImpl { } using DBImpl::Flush; virtual Status Flush(const FlushOptions& options, - const ColumnFamilyHandle& column_family) { + ColumnFamilyHandle* column_family) { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc index 30c2a6384..46918d4e7 100644 --- a/db/db_stats_logger.cc +++ b/db/db_stats_logger.cc @@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() { uint64_t file_total_size = 0; uint32_t file_total_num = 0; - Version* current = default_cfd_->current(); + Version* current = default_cf_handle_->cfd()->current(); for (int i = 0; i < current->NumberLevels(); i++) { file_total_num += current->NumLevelFiles(i); file_total_size += current->NumLevelBytes(i); diff --git a/db/db_test.cc b/db/db_test.cc index 1edb14799..d525314f7 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4741,24 +4741,30 @@ class ModelDB: public DB { KVMap map_; }; - explicit ModelDB(const Options& options): options_(options) { } + explicit ModelDB(const Options& options) : options_(options) {} using DB::Put; - virtual Status Put(const WriteOptions& o, const ColumnFamilyHandle& cf, + virtual Status Put(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& k, const Slice& v) { - return DB::Put(o, cf, k, v); + WriteBatch batch; + batch.Put(0, k, v); + return Write(o, &batch); } using DB::Merge; - virtual Status Merge(const WriteOptions& o, const ColumnFamilyHandle& cf, + virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& k, const Slice& v) { - return DB::Merge(o, cf, k, v); + WriteBatch batch; + batch.Merge(0, k, v); + return Write(o, &batch); } using DB::Delete; - virtual Status Delete(const WriteOptions& o, const ColumnFamilyHandle& cf, + virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& key) { - return DB::Delete(o, cf, key); + WriteBatch batch; + batch.Delete(0, key); + return Write(o, &batch); } using DB::Get; - virtual Status Get(const ReadOptions& options, const ColumnFamilyHandle& cf, + virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf, const Slice& key, std::string* value) { return Status::NotSupported(key); } @@ -4766,7 +4772,7 @@ class ModelDB: public DB { using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values) { std::vector s(keys.size(), Status::NotSupported("Not implemented.")); @@ -4774,9 +4780,8 @@ class ModelDB: public DB { } using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, - bool* value_found = nullptr) { + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found = nullptr) { if (value_found != nullptr) { *value_found = false; } @@ -4784,7 +4789,7 @@ class ModelDB: public DB { } using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& options, - const ColumnFamilyHandle& column_family) { + ColumnFamilyHandle* column_family) { if (options.snapshot == nullptr) { KVMap* saved = new KVMap; *saved = map_; @@ -4797,7 +4802,7 @@ class ModelDB: public DB { } virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, std::vector* iterators) { return Status::NotSupported("Not supported yet"); } @@ -4831,36 +4836,34 @@ class ModelDB: public DB { } using DB::GetProperty; - virtual bool GetProperty(const ColumnFamilyHandle& column_family, + virtual bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) { return false; } using DB::GetApproximateSizes; - virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + virtual void GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* range, int n, uint64_t* sizes) { for (int i = 0; i < n; i++) { sizes[i] = 0; } } using DB::CompactRange; - virtual Status CompactRange(const ColumnFamilyHandle& column_family, + virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* start, const Slice* end, bool reduce_level, int target_level) { return Status::NotSupported("Not supported operation."); } using DB::NumberLevels; - virtual int NumberLevels(const ColumnFamilyHandle& column_family) { - return 1; - } + virtual int NumberLevels(ColumnFamilyHandle* column_family) { return 1; } using DB::MaxMemCompactionLevel; - virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) { + virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) { return 1; } using DB::Level0StopWriteTrigger; - virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { + virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { return -1; } @@ -4873,14 +4876,13 @@ class ModelDB: public DB { } using DB::GetOptions; - virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) - const { + virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const { return options_; } using DB::Flush; virtual Status Flush(const rocksdb::FlushOptions& options, - const ColumnFamilyHandle& column_family) { + ColumnFamilyHandle* column_family) { Status ret; return ret; } @@ -4916,6 +4918,8 @@ class ModelDB: public DB { return Status::NotSupported("Not supported in Model DB"); } + virtual ColumnFamilyHandle* DefaultColumnFamily() const { return nullptr; } + private: class ModelIter: public Iterator { public: diff --git a/db/version_set.cc b/db/version_set.cc index 228d323b7..d9fbce255 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1396,9 +1396,6 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* options, storage_options_compactions_(storage_options_) {} VersionSet::~VersionSet() { - for (auto cfd : *column_family_set_) { - cfd->current()->Unref(); - } // we need to delete column_family_set_ because its destructor depends on // VersionSet column_family_set_.reset(); @@ -1434,6 +1431,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, bool new_descriptor_log) { mu->AssertHeld(); + if (column_family_data->IsDropped()) { + // no need to write anything to the manifest + return Status::OK(); + } + // queue our request ManifestWriter w(mu, column_family_data, edit); manifest_writers_.push_back(&w); @@ -1759,7 +1761,13 @@ Status VersionSet::Recover( assert(builder != builders.end()); delete builder->second; builders.erase(builder); - DropColumnFamily(&edit); + auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); + if (cfd->Unref()) { + delete cfd; + } else { + // who else can have reference to cfd!? + assert(false); + } } else if (cf_in_not_found) { column_families_not_found.erase(edit.column_family_); } else { @@ -1921,17 +1929,17 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { - VersionEdit edit; - s = edit.DecodeFrom(record); - if (!s.ok()) { - break; - } - if (edit.is_column_family_add_) { - column_family_names.insert( - {edit.column_family_, edit.column_family_name_}); - } else if (edit.is_column_family_drop_) { - column_family_names.erase(edit.column_family_); - } + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + if (edit.is_column_family_add_) { + column_family_names.insert( + {edit.column_family_, edit.column_family_name_}); + } else if (edit.is_column_family_drop_) { + column_family_names.erase(edit.column_family_); + } } column_families->clear(); @@ -2433,8 +2441,4 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( return new_cfd; } -void VersionSet::DropColumnFamily(VersionEdit* edit) { - column_family_set_->DropColumnFamily(edit->column_family_); -} - } // namespace rocksdb diff --git a/db/version_set.h b/db/version_set.h index e2cbd5643..aa29640b2 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -396,8 +396,6 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options, VersionEdit* edit); - void DropColumnFamily(VersionEdit* edit); - ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } private: diff --git a/db/write_batch.cc b/db/write_batch.cc index 084091aad..827f60aa7 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -277,7 +277,13 @@ class MemTableInserter : public WriteBatch::Handler { std::string prev_value; std::string merged_value; - Status s = db_->Get(ropts, key, &prev_value); + + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + Status s = db_->Get(ropts, cf_handle, key, &prev_value); + char* prev_buffer = const_cast(prev_value.c_str()); uint32_t prev_size = prev_value.size(); auto status = options->inplace_callback(s.ok() ? prev_buffer : nullptr, @@ -333,8 +339,11 @@ class MemTableInserter : public WriteBatch::Handler { ReadOptions read_options; read_options.snapshot = &read_from_snapshot; - db_->Get(read_options, cf_mems_->GetColumnFamilyHandle(), key, - &get_value); + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + db_->Get(read_options, cf_handle, key, &get_value); Slice get_value_slice = Slice(get_value); // 2) Apply this merge @@ -378,8 +387,11 @@ class MemTableInserter : public WriteBatch::Handler { ReadOptions ropts; ropts.snapshot = &read_from_snapshot; std::string value; - if (!db_->KeyMayExist(ropts, cf_mems_->GetColumnFamilyHandle(), key, - &value)) { + auto cf_handle = cf_mems_->GetColumnFamilyHandle(); + if (cf_handle == nullptr) { + cf_handle = db_->DefaultColumnFamily(); + } + if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES); return; } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 20ddf80e5..7db21f1a3 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -12,7 +12,6 @@ #include "rocksdb/write_batch.h" #include "rocksdb/db.h" #include "rocksdb/options.h" -#include "rocksdb/column_family.h" namespace rocksdb { @@ -28,7 +27,7 @@ class ColumnFamilyMemTables { virtual uint64_t GetLogNumber() const = 0; virtual MemTable* GetMemTable() const = 0; virtual const Options* GetFullOptions() const = 0; - virtual const ColumnFamilyHandle& GetColumnFamilyHandle() const = 0; + virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; }; class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { @@ -53,9 +52,7 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { return options_; } - const ColumnFamilyHandle& GetColumnFamilyHandle() const override { - return default_column_family; - } + ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } private: bool ok_; diff --git a/include/rocksdb/column_family.h b/include/rocksdb/column_family.h deleted file mode 100644 index b6e24e70f..000000000 --- a/include/rocksdb/column_family.h +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#pragma once -#include "rocksdb/slice.h" -#include - -namespace rocksdb { - -// Column family's name is translated to ColumnFamilyHandle at DB open or column -// family open time. Clients use ColumnFamilyHandle to comunicate with the DB -// -// Column family names that start with "." (a dot) are system specific and -// should not be used by the clients - -struct ColumnFamilyHandle { - uint32_t id; - // default - ColumnFamilyHandle() : id() {} - explicit ColumnFamilyHandle(uint32_t _id) : id(_id) {} -}; - -const ColumnFamilyHandle default_column_family = ColumnFamilyHandle(); -extern const std::string default_column_family_name; - -} // namespace rocksdb diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index b963a66c0..2f348d54f 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -23,8 +23,15 @@ namespace rocksdb { using std::unique_ptr; -struct ColumnFamilyHandle; -extern const ColumnFamilyHandle default_column_family; +class ColumnFamilyHandle { + public: + ColumnFamilyHandle() {} + virtual ~ColumnFamilyHandle() {} + + private: + ColumnFamilyHandle(const ColumnFamilyHandle&); // no copying +}; +extern const std::string default_column_family_name; struct ColumnFamilyDescriptor { std::string name; @@ -106,7 +113,7 @@ class DB { // will use to operate on column family column_family[i] static Status Open(const DBOptions& db_options, const std::string& name, const std::vector& column_families, - std::vector* handles, DB** dbptr); + std::vector* handles, DB** dbptr); // ListColumnFamilies will open the DB specified by argument name // and return the list of all column families in that DB @@ -123,23 +130,22 @@ class DB { // through the argument handle. virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, - ColumnFamilyHandle* handle); + ColumnFamilyHandle** handle); - // Drop a column family specified by column_family handle. - // All data related to the column family will be deleted before - // the function returns. - // Calls referring to the dropped column family will fail. - virtual Status DropColumnFamily(const ColumnFamilyHandle& column_family); + // Drop a column family specified by column_family handle. This call + // only records a drop record in the manifest and prevents the column + // family from flushing and compacting. + virtual Status DropColumnFamily(ColumnFamilyHandle* column_family); // Set the database entry for "key" to "value". // Returns OK on success, and a non-OK status on error. // Note: consider setting options.sync = true. virtual Status Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) = 0; Status Put(const WriteOptions& options, const Slice& key, const Slice& value) { - return Put(options, default_column_family, key, value); + return Put(options, DefaultColumnFamily(), key, value); } // Remove the database entry (if any) for "key". Returns OK on @@ -147,10 +153,10 @@ class DB { // did not exist in the database. // Note: consider setting options.sync = true. virtual Status Delete(const WriteOptions& options, - const ColumnFamilyHandle& column_family, + ColumnFamilyHandle* column_family, const Slice& key) = 0; Status Delete(const WriteOptions& options, const Slice& key) { - return Delete(options, default_column_family, key); + return Delete(options, DefaultColumnFamily(), key); } // Merge the database entry for "key" with "value". Returns OK on success, @@ -158,11 +164,11 @@ class DB { // determined by the user provided merge_operator when opening DB. // Note: consider setting options.sync = true. virtual Status Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) = 0; + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; Status Merge(const WriteOptions& options, const Slice& key, const Slice& value) { - return Merge(options, default_column_family, key, value); + return Merge(options, DefaultColumnFamily(), key, value); } // Apply the specified updates to the database. @@ -178,10 +184,10 @@ class DB { // // May return some other Status on an error. virtual Status Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value) = 0; Status Get(const ReadOptions& options, const Slice& key, std::string* value) { - return Get(options, default_column_family, key, value); + return Get(options, DefaultColumnFamily(), key, value); } // If keys[i] does not exist in the database, then the i'th returned @@ -196,13 +202,13 @@ class DB { // duplicate values in order. virtual std::vector MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values) = 0; std::vector MultiGet(const ReadOptions& options, const std::vector& keys, std::vector* values) { - return MultiGet(options, std::vector( - keys.size(), default_column_family), + return MultiGet(options, std::vector( + keys.size(), DefaultColumnFamily()), keys, values); } @@ -214,9 +220,8 @@ class DB { // to make this lighter weight is to avoid doing any IOs. // Default implementation here returns true and sets 'value_found' to false virtual bool KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, - bool* value_found = nullptr) { + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found = nullptr) { if (value_found != nullptr) { *value_found = false; } @@ -224,7 +229,7 @@ class DB { } bool KeyMayExist(const ReadOptions& options, const Slice& key, std::string* value, bool* value_found = nullptr) { - return KeyMayExist(options, default_column_family, key, value, value_found); + return KeyMayExist(options, DefaultColumnFamily(), key, value, value_found); } // Return a heap-allocated iterator over the contents of the database. @@ -234,16 +239,16 @@ class DB { // Caller should delete the iterator when it is no longer needed. // The returned iterator should be deleted before this db is deleted. virtual Iterator* NewIterator(const ReadOptions& options, - const ColumnFamilyHandle& column_family) = 0; + ColumnFamilyHandle* column_family) = 0; Iterator* NewIterator(const ReadOptions& options) { - return NewIterator(options, default_column_family); + return NewIterator(options, DefaultColumnFamily()); } // Returns iterators from a consistent database state across multiple // column families. Iterators are heap allocated and need to be deleted // before the db is deleted virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, std::vector* iterators) = 0; // Return a handle to the current DB state. Iterators created with @@ -270,10 +275,10 @@ class DB { // about the internal operation of the DB. // "rocksdb.sstables" - returns a multi-line string that describes all // of the sstables that make up the db contents. - virtual bool GetProperty(const ColumnFamilyHandle& column_family, + virtual bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) = 0; bool GetProperty(const Slice& property, std::string* value) { - return GetProperty(default_column_family, property, value); + return GetProperty(DefaultColumnFamily(), property, value); } // For each i in [0,n-1], store in "sizes[i]", the approximate @@ -284,11 +289,11 @@ class DB { // sizes will be one-tenth the size of the corresponding user data size. // // The results may not include the sizes of recently written data. - virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + virtual void GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* range, int n, uint64_t* sizes) = 0; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { - GetApproximateSizes(default_column_family, range, n, sizes); + GetApproximateSizes(DefaultColumnFamily(), range, n, sizes); } // Compact the underlying storage for the key range [*begin,*end]. @@ -308,35 +313,31 @@ class DB { // hosting all the files. In this case, client could set reduce_level // to true, to move the files back to the minimum level capable of holding // the data set or a given level (specified by non-negative target_level). - virtual Status CompactRange(const ColumnFamilyHandle& column_family, + virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) = 0; Status CompactRange(const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) { - return CompactRange(default_column_family, begin, end, reduce_level, + return CompactRange(DefaultColumnFamily(), begin, end, reduce_level, target_level); } // Number of levels used for this DB. - virtual int NumberLevels(const ColumnFamilyHandle& column_family) = 0; - int NumberLevels() { - return NumberLevels(default_column_family); - } + virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; + int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } // Maximum level to which a new compacted memtable is pushed if it // does not create overlap. - virtual int MaxMemCompactionLevel( - const ColumnFamilyHandle& column_family) = 0; + virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) = 0; int MaxMemCompactionLevel() { - return MaxMemCompactionLevel(default_column_family); + return MaxMemCompactionLevel(DefaultColumnFamily()); } // Number of files in level-0 that would stop writes. - virtual int Level0StopWriteTrigger( - const ColumnFamilyHandle& column_family) = 0; + virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) = 0; int Level0StopWriteTrigger() { - return Level0StopWriteTrigger(default_column_family); + return Level0StopWriteTrigger(DefaultColumnFamily()); } // Get DB name -- the exact same name that was provided as an argument to @@ -347,17 +348,17 @@ class DB { virtual Env* GetEnv() const = 0; // Get DB Options that we use - virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) + virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const = 0; const Options& GetOptions() const { - return GetOptions(default_column_family); + return GetOptions(DefaultColumnFamily()); } // Flush all mem-table data. virtual Status Flush(const FlushOptions& options, - const ColumnFamilyHandle& column_family) = 0; + ColumnFamilyHandle* column_family) = 0; Status Flush(const FlushOptions& options) { - return Flush(options, default_column_family); + return Flush(options, DefaultColumnFamily()); } // Prevent file deletions. Compactions will continue to occur, @@ -426,6 +427,9 @@ class DB { // be set properly virtual Status GetDbIdentity(std::string& identity) = 0; + // Returns default column family handle + virtual ColumnFamilyHandle* DefaultColumnFamily() const = 0; + private: // No copying allowed DB(const DB&); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index a0072ce68..238d6d5f5 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -27,7 +27,6 @@ #include #include "rocksdb/status.h" -#include "rocksdb/column_family.h" namespace rocksdb { diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 7f91cd5ad..e7c9583b2 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -23,14 +23,14 @@ class StackableDB : public DB { using DB::Put; virtual Status Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) override { return db_->Put(options, column_family, key, val); } using DB::Get; virtual Status Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override { return db_->Get(options, column_family, key, value); } @@ -38,7 +38,7 @@ class StackableDB : public DB { using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values) override { return db_->MultiGet(options, column_family, keys, values); @@ -46,23 +46,23 @@ class StackableDB : public DB { using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found = nullptr) override { return db_->KeyMayExist(options, column_family, key, value, value_found); } using DB::Delete; virtual Status Delete(const WriteOptions& wopts, - const ColumnFamilyHandle& column_family, + ColumnFamilyHandle* column_family, const Slice& key) override { return db_->Delete(wopts, column_family, key); } using DB::Merge; virtual Status Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) override { + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override { return db_->Merge(options, column_family, key, value); } @@ -74,14 +74,13 @@ class StackableDB : public DB { using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& opts, - const ColumnFamilyHandle& column_family) - override { + ColumnFamilyHandle* column_family) override { return db_->NewIterator(opts, column_family); } virtual Status NewIterators( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, std::vector* iterators) { return db_->NewIterators(options, column_family, iterators); } @@ -96,20 +95,20 @@ class StackableDB : public DB { } using DB::GetProperty; - virtual bool GetProperty(const ColumnFamilyHandle& column_family, + virtual bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, std::string* value) override { return db_->GetProperty(column_family, property, value); } using DB::GetApproximateSizes; - virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + virtual void GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* r, int n, uint64_t* sizes) override { return db_->GetApproximateSizes(column_family, r, n, sizes); } using DB::CompactRange; - virtual Status CompactRange(const ColumnFamilyHandle& column_family, + virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) override { @@ -118,18 +117,18 @@ class StackableDB : public DB { } using DB::NumberLevels; - virtual int NumberLevels(const ColumnFamilyHandle& column_family) override { + virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family); } using DB::MaxMemCompactionLevel; - virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) + virtual int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) override { return db_->MaxMemCompactionLevel(column_family); } using DB::Level0StopWriteTrigger; - virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) + virtual int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) override { return db_->Level0StopWriteTrigger(column_family); } @@ -143,14 +142,14 @@ class StackableDB : public DB { } using DB::GetOptions; - virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) - const override { + virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const + override { return db_->GetOptions(column_family); } using DB::Flush; virtual Status Flush(const FlushOptions& fopts, - const ColumnFamilyHandle& column_family) override { + ColumnFamilyHandle* column_family) override { return db_->Flush(fopts, column_family); } @@ -189,6 +188,10 @@ class StackableDB : public DB { return db_->GetUpdatesSince(seq_number, iter); } + virtual ColumnFamilyHandle* DefaultColumnFamily() const override { + return db_->DefaultColumnFamily(); + } + protected: DB* db_; }; diff --git a/table/filter_block.h b/table/filter_block.h index da19d42e9..05c2bb943 100644 --- a/table/filter_block.h +++ b/table/filter_block.h @@ -46,6 +46,9 @@ class FilterBlockBuilder { bool SamePrefix(const Slice &key1, const Slice &key2) const; void GenerateFilter(); + // important: all of these might point to invalid addresses + // at the time of destruction of this filter block. destructor + // should NOT dereference them. const FilterPolicy* policy_; const SliceTransform* prefix_extractor_; bool whole_key_filtering_; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index f3a0e2425..870abd01e 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -11,7 +11,6 @@ #include "db/filename.h" #include "db/write_batch_internal.h" #include "rocksdb/write_batch.h" -#include "rocksdb/column_family.h" #include "rocksdb/cache.h" #include "util/coding.h" diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index eb4a7d79b..5f8614bfe 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -45,8 +45,8 @@ class DummyDB : public StackableDB { } using DB::GetOptions; - virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) - const override { + virtual const Options& GetOptions(ColumnFamilyHandle* column_family) const + override { return options_; } @@ -70,6 +70,10 @@ class DummyDB : public StackableDB { return Status::OK(); } + virtual ColumnFamilyHandle* DefaultColumnFamily() const override { + return nullptr; + } + class DummyLogFile : public LogFile { public: /* implicit */ diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 725db4fdc..589148f48 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -120,7 +120,7 @@ Status DBWithTTL::StripTS(std::string* str) { } Status DBWithTTL::Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { WriteBatch batch; batch.Put(key, val); @@ -128,7 +128,7 @@ Status DBWithTTL::Put(const WriteOptions& options, } Status DBWithTTL::Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { Status st = db_->Get(options, key, value); if (!st.ok()) { @@ -143,7 +143,7 @@ Status DBWithTTL::Get(const ReadOptions& options, std::vector DBWithTTL::MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values) { return std::vector(keys.size(), Status::NotSupported("MultiGet not\ @@ -151,9 +151,8 @@ std::vector DBWithTTL::MultiGet( } bool DBWithTTL::KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, - bool* value_found) { + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found) { bool ret = db_->KeyMayExist(options, key, value, value_found); if (ret && value != nullptr && value_found != nullptr && *value_found) { if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { @@ -164,8 +163,8 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options, } Status DBWithTTL::Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) { + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { WriteBatch batch; batch.Merge(key, value); return Write(options, &batch); @@ -211,7 +210,7 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { } Iterator* DBWithTTL::NewIterator(const ReadOptions& opts, - const ColumnFamilyHandle& column_family) { + ColumnFamilyHandle* column_family) { return new TtlIterator(db_->NewIterator(opts, column_family)); } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 3d3dd2ad8..9a1d96a93 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -22,38 +22,37 @@ class DBWithTTL : public StackableDB { using StackableDB::Put; virtual Status Put(const WriteOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) override; using StackableDB::Get; virtual Status Get(const ReadOptions& options, - const ColumnFamilyHandle& column_family, const Slice& key, + ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override; using StackableDB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, - const std::vector& column_family, + const std::vector& column_family, const std::vector& keys, std::vector* values) override; using StackableDB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, std::string* value, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found = nullptr) override; using StackableDB::Merge; virtual Status Merge(const WriteOptions& options, - const ColumnFamilyHandle& column_family, - const Slice& key, const Slice& value) override; + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; using StackableDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& opts, - const ColumnFamilyHandle& column_family) - override; + ColumnFamilyHandle* column_family) override; // Simulate a db crash, no elegant closing of database. void TEST_Destroy_DBWithTtl();